热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

实战|使用SparkStreaming写入Hudi

1.项目背景传统数仓的组织架构是针对离线数据的OLAP(联机事务分析)需求设计的,常用的导

1. 项目背景

传统数仓的组织架构是针对离线数据的OLAP(联机事务分析)需求设计的,常用的导入数据方式为采用sqoop或spark定时作业逐批将业务库数据导入数仓。随着数据分析对实时性要求的不断提高,按小时、甚至分钟级的数据同步越来越普遍。由此展开了基于spark/flink流处理机制的(准)实时同步系统的开发。

然而实时同步数仓从一开始就面临如下几个挑战:

  • 小文件问题。不论是spark的microbatch模式,还是flink的逐条处理模式,每次写入HDFS时都是几M甚至几十KB的文件。长时间下来产生的大量小文件,会对HDFS namenode产生巨大的压力。

  • 对update操作的支持。HDFS系统本身不支持数据的修改,无法实现同步过程中对记录进行修改。

  • 事务性。不论是追加数据还是修改数据,如何保证事务性。即数据只在流处理程序commit操作时一次性写入HDFS,当程序rollback时,已写入或部分写入的数据能随之删除。

Hudi是针对以上问题的解决方案之一。以下是对Hudi的简单介绍,主要内容翻译自官网。

2. Hudi简介

2.1 时间线(Timeline)

Hudi内部按照操作时刻(instant)对表的所有操作维护了一条时间线,由此可以提供表在某一时刻的视图,还能够高效的提取出延后到达的数据。每一个时刻包含:

  • 时刻行为:对表操作的类型,包含:

commit:提交,将批次的数据原子性的写入表;

clean: 清除,后台作业,不断清除不需要的旧得版本的数据;

delta_commit:delta 提交是将批次记录原子性的写入MergeOnRead表中,数据写入的目的地是delta日志文件;

compacttion:压缩,后台作业,将不同结构的数据,例如记录更新操作的行式存储的日志文件合并到列式存储的文件中。压缩本身是一个特殊的commit操作;

rollback:回滚,一些不成功时,删除所有部分写入的文件;

savepoint:保存点,标志某些文件组为“保存的“,这样cleaner就不会删除这些文件;

  • 时刻时间:操作开始的时间戳;

  • 状态:时刻的当前状态,包含:

requested 某个操作被安排执行,但尚未初始化

inflight 某个操作正在执行

completed 某一个操作在时间线上已经完成

Hudi保证按照时间线执行的操作按照时刻时间具有原子性及时间线一致性。

2.2 文件管理

Hudi表存在在DFS系统的 base path(用户写入Hudi时自定义) 目录下,在该目录下被分成不同的分区。每一个分区以 partition path 作为唯一的标识,组织形式与Hive相同。

每一个分区内,文件通过唯一的 FileId 文件id 划分到 FileGroup 文件组。每一个FileGroup包含多个 FileSlice 文件切片,每一个切片包含一个由commit或compaction操作形成的base file 基础文件(parquet文件),以及包含对基础文件进行inserts/update操作的log files 日志文件(log文件)。Hudi采用了MVCC设计,compaction操作会将日志文件和对应的基础文件合并成新的文件切片,clean操作则删除无效的或老版本的文件。

2.3 索引

Hudi通过映射Hoodie键(记录键+ 分区路径)到文件id,提供了高效的upsert操作。当第一个版本的记录写入文件时,这个记录键值和文件的映射关系就不会发生任何改变。换言之,映射的文件组始终包含一组记录的所有版本。

2.4 表类型&查询

Hudi表类型定义了数据是如何被索引、分布到DFS系统,以及以上基本属性和时间线事件如何施加在这个组织上。查询类型定义了底层数据如何暴露给查询。

表类型支持的查询类型
Copy On Write写时复制快照查询 + 增量查询
Merge On Read读时合并快照查询 + 增量查询 + 读取优化

2.4.1 表类型

Copy On Write:仅采用列式存储文件(parquet)存储文件。更新数据时,在写入的同时同步合并文件,仅仅修改文件的版次并重写。

Merge On Read:采用列式存储文件(parquet)+行式存储文件(avro)存储数据。更新数据时,新数据被写入delta文件并随后以异步或同步的方式合并成新版本的列式存储文件。

取舍CopyOnWriteMergeOnRead
数据延迟
Update cost (I/O)更新操作开销(I/O)高(重写整个parquet)低(追加到delta记录)
Parquet文件大小小(高更新(I/O)开销)大(低更新开销)
写入频率低(取决于合并策略)

2.4.2 查询类型

  • 快照查询:查询会看到以后的提交操作和合并操作的最新的表快照。对于merge on read表,会将最新的基础文件和delta文件进行合并,从而会看到近实时的数据(几分钟的延迟)。对于copy on write表,当存在更新/删除操作时或其他写操作时,会直接代替已有的parquet表。

  • 增量查询:查询只会看到给定提交/合并操作之后新写入的数据。由此有效的提供了变更流,从而实现了增量数据管道。

  • 读优化查询:查询会看到给定提交/合并操作之后表的最新快照。只会查看到最新的文件切片中的基础/列式存储文件,并且保证和非hudi列式存储表相同的查询效率。

取舍快照读取优化
数据延迟
查询延迟高(合并基础/列式存储文件 + 行式存储delta 日志 文件)低(原有的基础/列式存储文件查询性能)

3. Spark结构化流写入Hudi

以下是整合spark结构化流+hudi的示意代码,由于Hudi OutputFormat目前只支持在spark rdd对象中调用,因此写入HDFS操作采用了spark structured streaming的forEachBatch算子。具体说明见注释。

  1. package pers.machi.sparkhudi


  2. import org.apache.log4j.Logger

  3. import org.apache.spark.sql.catalyst.encoders.RowEncoder

  4. import org.apache.spark.sql.{DataFrame, Row, SaveMode}

  5. import org.apache.spark.sql.functions._

  6. import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}


  7. object SparkHudi {

  8. val logger = Logger.getLogger(SparkHudi.getClass)


  9. def main(args: Array[String]): Unit = {


  10. val spark = SparkSession

  11. .builder

  12. .appName("SparkHudi")

  13. //.master("local[*]")

  14. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

  15. .config("spark.default.parallelism", 9)

  16. .config("spark.sql.shuffle.partitions", 9)

  17. .enableHiveSupport()

  18. .getOrCreate()


  19. // 添加监听器,每一批次处理完成,将该批次的相关信息,如起始offset,抓取记录数量,处理时间打印到控制台

  20. spark.streams.addListener(new StreamingQueryListener() {

  21. override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {

  22. println("Query started: " + queryStarted.id)

  23. }

  24. override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {

  25. println("Query terminated: " + queryTerminated.id)

  26. }

  27. override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {

  28. println("Query made progress: " + queryProgress.progress)

  29. }

  30. })


  31. // 定义kafka流

  32. val dataStreamReader = spark

  33. .readStream

  34. .format("kafka")

  35. .option("kafka.bootstrap.servers", "localhost:9092")

  36. .option("subscribe", "testTopic")

  37. .option("startingOffsets", "latest")

  38. .option("maxOffsetsPerTrigger", 100000)

  39. .option("failOnDataLoss", false)


  40. // 加载流数据,这里因为只是测试使用,直接读取kafka消息而不做其他处理,是spark结构化流会自动生成每一套消息对应的kafka元数据,如消息所在主题,分区,消息对应offset等。

  41. val df = dataStreamReader.load()

  42. .selectExpr(

  43. "topic as kafka_topic"

  44. "CAST(partition AS STRING) kafka_partition",

  45. "cast(timestamp as String) kafka_timestamp",

  46. "CAST(offset AS STRING) kafka_offset",

  47. "CAST(key AS STRING) kafka_key",

  48. "CAST(value AS STRING) kafka_value",

  49. "current_timestamp() current_time",

  50. )

  51. .selectExpr(

  52. "kafka_topic"

  53. "concat(kafka_partition,'-',kafka_offset) kafka_partition_offset",

  54. "kafka_offset",

  55. "kafka_timestamp",

  56. "kafka_key",

  57. "kafka_value",

  58. "substr(current_time,1,10) partition_date")


  59. // 创建并启动query

  60. val query = df

  61. .writeStream

  62. .queryName("demo").

  63. .foreachBatch { (batchDF: DataFrame, _: Long) => {

  64. batchDF.persist()


  65. println(LocalDateTime.now() + "start writing cow table")

  66. batchDF.write.format("org.apache.hudi")

  67. .option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")

  68. .option(PRECOMBINE_FIELD_OPT_KEY, "kafka_timestamp")

  69. // 以kafka分区和偏移量作为组合主键

  70. .option(RECORDKEY_FIELD_OPT_KEY, "kafka_partition_offset")

  71. // 以当前日期作为分区

  72. .option(PARTITIONPATH_FIELD_OPT_KEY, "partition_date")

  73. .option(TABLE_NAME, "copy_on_write_table")

  74. .option(HIVE_STYLE_PARTITIONING_OPT_KEY, true)

  75. .mode(SaveMode.Append)

  76. .save("/tmp/sparkHudi/COPY_ON_WRITE")


  77. println(LocalDateTime.now() + "start writing mor table")

  78. batchDF.write.format("org.apache.hudi")

  79. .option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ")

  80. .option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")

  81. .option(PRECOMBINE_FIELD_OPT_KEY, "kafka_timestamp")

  82. .option(RECORDKEY_FIELD_OPT_KEY, "kafka_partition_offset")

  83. .option(PARTITIONPATH_FIELD_OPT_KEY, "partition_date")

  84. .option(TABLE_NAME, "merge_on_read_table")

  85. .option(HIVE_STYLE_PARTITIONING_OPT_KEY, true)

  86. .mode(SaveMode.Append)

  87. .save("/tmp/sparkHudi/MERGE_ON_READ")


  88. println(LocalDateTime.now() + "finish")

  89. batchDF.unpersist()

  90. }

  91. }

  92. .option("checkpointLocation", "/tmp/sparkHudi/checkpoint/")

  93. .start()


  94. query.awaitTermination()

  95. }

  96. }

4. 测试结果

受限于测试条件,这次测试没有考虑update操作,而仅仅是测试hudi对追加新数据的性能。

数据程序一共运行5天,期间未发生报错导致程序退出。

kafka每天读取数据约1500万条,被消费的topic共有9个分区。

几点说明如下

1 是否有数据丢失及重复

由于每条记录的分区+偏移量具有唯一性,通过检查同一分区下是否有偏移量重复及不连续的情况,可以断定数据不存丢失及重复消费的情况。

2 最小可支持的单日写入数据条数

数据写入效率,对于cow及mor表,不存在更新操作时,写入速率接近。这本次测试中,spark每秒处理约170条记录。单日可处理1500万条记录。

3 cow和mor表文件大小对比

每十分钟读取两种表同一分区小文件大小,单位M。结果如下图,mor表文件大小增加较大,占用磁盘资源较多。不存在更新操作时,尽可能使用cow表。




推荐阅读
  • 生成式对抗网络模型综述摘要生成式对抗网络模型(GAN)是基于深度学习的一种强大的生成模型,可以应用于计算机视觉、自然语言处理、半监督学习等重要领域。生成式对抗网络 ... [详细]
  • 本文介绍了C#中数据集DataSet对象的使用及相关方法详解,包括DataSet对象的概述、与数据关系对象的互联、Rows集合和Columns集合的组成,以及DataSet对象常用的方法之一——Merge方法的使用。通过本文的阅读,读者可以了解到DataSet对象在C#中的重要性和使用方法。 ... [详细]
  • 超级简单加解密工具的方案和功能
    本文介绍了一个超级简单的加解密工具的方案和功能。该工具可以读取文件头,并根据特定长度进行加密,加密后将加密部分写入源文件。同时,该工具也支持解密操作。加密和解密过程是可逆的。本文还提到了一些相关的功能和使用方法,并给出了Python代码示例。 ... [详细]
  • GetWindowLong函数
    今天在看一个代码里头写了GetWindowLong(hwnd,0),我当时就有点费解,靠,上网搜索函数原型说明,死活找不到第 ... [详细]
  • 使用在线工具jsonschema2pojo根据json生成java对象
    本文介绍了使用在线工具jsonschema2pojo根据json生成java对象的方法。通过该工具,用户只需将json字符串复制到输入框中,即可自动将其转换成java对象。该工具还能解析列表式的json数据,并将嵌套在内层的对象也解析出来。本文以请求github的api为例,展示了使用该工具的步骤和效果。 ... [详细]
  • 解决VS写C#项目导入MySQL数据源报错“You have a usable connection already”问题的正确方法
    本文介绍了在VS写C#项目导入MySQL数据源时出现报错“You have a usable connection already”的问题,并给出了正确的解决方法。详细描述了问题的出现情况和报错信息,并提供了解决该问题的步骤和注意事项。 ... [详细]
  • HDFS2.x新特性
    一、集群间数据拷贝scp实现两个远程主机之间的文件复制scp-rhello.txtroothadoop103:useratguiguhello.txt推pushscp-rr ... [详细]
  • 本文介绍了Android 7的学习笔记总结,包括最新的移动架构视频、大厂安卓面试真题和项目实战源码讲义。同时还分享了开源的完整内容,并提醒读者在使用FileProvider适配时要注意不同模块的AndroidManfiest.xml中配置的xml文件名必须不同,否则会出现问题。 ... [详细]
  • CentOS 7部署KVM虚拟化环境之一架构介绍
    本文介绍了CentOS 7部署KVM虚拟化环境的架构,详细解释了虚拟化技术的概念和原理,包括全虚拟化和半虚拟化。同时介绍了虚拟机的概念和虚拟化软件的作用。 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • Postgresql备份和恢复的方法及命令行操作步骤
    本文介绍了使用Postgresql进行备份和恢复的方法及命令行操作步骤。通过使用pg_dump命令进行备份,pg_restore命令进行恢复,并设置-h localhost选项,可以完成数据的备份和恢复操作。此外,本文还提供了参考链接以获取更多详细信息。 ... [详细]
  • 本文分析了Wince程序内存和存储内存的分布及作用。Wince内存包括系统内存、对象存储和程序内存,其中系统内存占用了一部分SDRAM,而剩下的30M为程序内存和存储内存。对象存储是嵌入式wince操作系统中的一个新概念,常用于消费电子设备中。此外,文章还介绍了主电源和后备电池在操作系统中的作用。 ... [详细]
  • STL迭代器的种类及其功能介绍
    本文介绍了标准模板库(STL)定义的五种迭代器的种类和功能。通过图表展示了这几种迭代器之间的关系,并详细描述了各个迭代器的功能和使用方法。其中,输入迭代器用于从容器中读取元素,输出迭代器用于向容器中写入元素,正向迭代器是输入迭代器和输出迭代器的组合。本文的目的是帮助读者更好地理解STL迭代器的使用方法和特点。 ... [详细]
  • 花瓣|目标值_Compose 动画边学边做夏日彩虹
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Compose动画边学边做-夏日彩虹相关的知识,希望对你有一定的参考价值。引言Comp ... [详细]
  • 颜色迁移(reinhard VS welsh)
    不要谈什么天分,运气,你需要的是一个截稿日,以及一个不交稿就能打爆你狗头的人,然后你就会被自己的才华吓到。------ ... [详细]
author-avatar
太姥茶叶论坛_730
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有